有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

多线程如何在Java中暂停/恢复ExecutorService中的所有线程?

我向Java中的executorservice提交了一堆作业,我想暂时暂停所有这些作业。最好的方法是什么?我怎样才能恢复?还是我完全错了?我是否应该遵循其他模式来实现我想要实现的目标(即暂停/恢复执行服务的能力)


共 (5) 个答案

  1. # 1 楼答案

    我对你接受的答案提出了一些批评,但这些批评不是很有建设性。。。这是我的解决方案。我会使用这样的类,然后在需要暂停功能的任何地方/任何时候调用checkIn。在GitHub上找到它

    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * Provides a mechanism to pause multiple threads.
     * If wish your thread to participate, then it must regularly check in with an instance of this object.
     * 
     * @author Corin Lawson <corin@phiware.com.au>
     */
    public class Continue {
        private boolean isPaused;
        private ReentrantLock pauseLock = new ReentrantLock();
        private Condition unpaused = pauseLock.newCondition();
    
        public void checkIn() throws InterruptedException {
            if (isPaused) {
                pauseLock.lock();
                try {
                    while (isPaused)
                        unpaused.await();
                } finally {
                    pauseLock.unlock();
                }
            }
        }
    
        public void checkInUntil(Date deadline) throws InterruptedException {
            if (isPaused) {
                pauseLock.lock();
                try {
                    while (isPaused)
                        unpaused.awaitUntil(deadline);
                } finally {
                    pauseLock.unlock();
                }
            }
        }
    
        public void checkIn(long nanosTimeout) throws InterruptedException {
            if (isPaused) {
                pauseLock.lock();
                try {
                    while (isPaused)
                        unpaused.awaitNanos(nanosTimeout);
                } finally {
                    pauseLock.unlock();
                }
            }
        }
    
        public void checkIn(long time, TimeUnit unit) throws InterruptedException {
            if (isPaused) {
                pauseLock.lock();
                try {
                    while (isPaused)
                        unpaused.await(time, unit);
                } finally {
                    pauseLock.unlock();
                }
            }
        }
    
        public void checkInUninterruptibly() {
            if (isPaused) {
                pauseLock.lock();
                try {
                    while (isPaused)
                        unpaused.awaitUninterruptibly();
                } finally {
                    pauseLock.unlock();
                }
            }
        }
    
        public boolean isPaused() {
            return isPaused;
        }
    
        public void pause() {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
        }
    
        public void resume() {
            pauseLock.lock();
            try {
                if (isPaused) {
                    isPaused = false;
                    unpaused.signalAll();
                }
            } finally {
                pauseLock.unlock();
            }
        }
    }
    

    例如:

    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.ThreadFactory;
    
    public class PausableExecutor extends ScheduledThreadPoolExecutor {
        private Continue cont;
    
        public PausableExecutor(int corePoolSize, ThreadFactory threadFactory, Continue c) {
            super(corePoolSize, threadFactory);
            cont = c;
        }
    
        protected void beforeExecute(Thread t, Runnable r) {
            cont.checkIn();
            super.beforeExecute(t, r);
        }
    }
    

    这还有一个额外的好处,即您可以通过一次调用Continuepause来暂停多个线程

  2. # 2 楼答案

    为了回答我自己的问题,我在ThreadPoolExecutor{a1}的javadocs中找到了一个PausableThreadPoolExecutor示例。以下是我使用Guava显示器的版本:

    import com.google.common.util.concurrent.Monitor;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.ThreadFactory;
    
    public class PausableExecutor extends ScheduledThreadPoolExecutor {
    
        private boolean isPaused;
    
        private final Monitor monitor = new Monitor();
        private final Monitor.Guard paused = new Monitor.Guard(monitor) {
            @Override
            public boolean isSatisfied() {
                return isPaused;
            }
        };
    
        private final Monitor.Guard notPaused = new Monitor.Guard(monitor) {
            @Override
            public boolean isSatisfied() {
                return !isPaused;
            }
        };
    
        public PausableExecutor(int corePoolSize, ThreadFactory threadFactory) {
            super(corePoolSize, threadFactory);
        }
    
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            monitor.enterWhenUninterruptibly(notPaused);
            try {
                monitor.waitForUninterruptibly(notPaused);
            } finally {
                monitor.leave();
            }
        }
    
        public void pause() {
            monitor.enterIf(notPaused);
            try {
                isPaused = true;
            } finally {
                monitor.leave();
            }
        }
    
        public void resume() {
            monitor.enterIf(paused);
            try {
                isPaused = false;
            } finally {
                monitor.leave();
            }
        }
    }
    
  3. # 3 楼答案

    我在executor中寻找暂停/恢复功能,但具有等待当前正在处理的任何任务的附加功能。下面是其他优秀实现的变体,添加了等待函数。我用单线程在executor上测试它。所以基本用法是:

    executor.pause();
    executor.await(10000); // blocks till current tasks processing ends
    

    类别代码:

    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class PausableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {      
      public boolean isPaused;
      private ReentrantLock pauseLock = new ReentrantLock();
      private Condition unpaused = pauseLock.newCondition();
      private Latch activeTasksLatch = new Latch();
    
      private class Latch {
        private final Object synchObj = new Object();
        private int count;
    
        public boolean awaitZero(long waitMS) throws InterruptedException {
          long startTime = System.currentTimeMillis();
          synchronized (synchObj) {
            while (count > 0) {
              if ( waitMS != 0) {
                synchObj.wait(waitMS);
                long curTime = System.currentTimeMillis();
                if ( (curTime - startTime) > waitMS ) {                
                  return count <= 0;
                }
              }
              else
                synchObj.wait();
            }
            return count <= 0; 
          }
        }
        public void countDown() {
          synchronized (synchObj) {
            if (--count <= 0) {
              // assert count >= 0;              
              synchObj.notifyAll();
            }
          }
        }
        public void countUp() {
          synchronized (synchObj) {
            count++;
          }
        }    
      }
    
      /**
       * Default constructor for a simple fixed threadpool
       */
      public PausableScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize);
      }
    
      /**
       * Executed before a task is assigned to a thread.
       */
      @Override
      protected void beforeExecute(Thread t, Runnable r) {
        pauseLock.lock();
        try {
          while (isPaused)
            unpaused.await();
        } catch (InterruptedException ie) {
          t.interrupt();
        } finally {
          pauseLock.unlock();
        }
    
        activeTasksLatch.countUp();
        super.beforeExecute(t, r);
      }
    
      @Override
      protected void afterExecute(Runnable r, Throwable t) {
        try {
          super.afterExecute(r, t);
        }
        finally {
          activeTasksLatch.countDown();
        }
      }
    
      /**
       * Pause the threadpool. Running tasks will continue running, but new tasks
       * will not start untill the threadpool is resumed.
       */
      public void pause() {
        pauseLock.lock();
        try {
          isPaused = true;
        } finally {
          pauseLock.unlock();
        }
      }
    
      /**
       * Wait for all active tasks to end.
       */ 
      public boolean await(long timeoutMS) {
        // assert isPaused;
        try {
          return activeTasksLatch.awaitZero(timeoutMS);
        } catch (InterruptedException e) {
          // log e, or rethrow maybe
        }
        return false;
      }
    
      /**
       * Resume the threadpool.
       */
      public void resume() {
        pauseLock.lock();
        try {
          isPaused = false;
          unpaused.signalAll();
        } finally {
          pauseLock.unlock();
        }
      }
    
    }
    
  4. # 4 楼答案

    我知道这是老生常谈,但我尝试了所有这些答案,但没有一个对我试图用可暂停计时器做的事情起作用;一旦恢复(一次完成),他们都会按照计划扔掉所有数据

    相反,我在GitHub*here上找到了这个Timer类。这对我来说非常有效

    *我没有编写此代码,只是找到了它

  5. # 5 楼答案

    问题在于,可运行/可调用程序本身需要检查何时暂停/恢复。有很多方法可以做到这一点,这取决于您对如何最好地做到这一点的要求。无论您需要什么样的解决方案,都可以使等待的线程可中断,从而可以干净地关闭线程